Skip to main content

Overview

The fetch_all_ohlcv.py script retrieves historical daily OHLCV (candlestick) data for all stocks. It implements smart incremental updates by detecting existing data and only fetching missing days, plus hybrid live snapshot integration to include today’s intraday data.

Purpose

Fetches historical price and volume data:
  • Open, High, Low, Close prices (daily candles)
  • Volume (daily trading volume)
  • Incremental Updates - Only fetches missing dates
  • Live Integration - Merges today’s live data with historical data
  • Chunked Fetching - Handles large date ranges in 180-day chunks

API Endpoints

Historical OHLCV Endpoint

URL
string
required
https://openweb-ticks.dhan.co/getDataH
Method
string
required
POST

Live Snapshot Endpoint

URL
string
required
https://ow-scanx-analytics.dhan.co/customscan/fetchdt
Method
string
required
POST

Request Payloads

Historical Data Request

{
  "EXCH": "NSE",
  "SYM": "<SYMBOL>",
  "SEG": "E",
  "INST": "EQUITY",
  "SEC_ID": "<Sid>",
  "EXPCODE": 0,
  "INTERVAL": "D",
  "START": 215634600,
  "END": <CURRENT_TIMESTAMP>
}

Parameters

EXCH
string
default:"NSE"
required
Exchange code (NSE for National Stock Exchange)
SYM
string
required
Stock symbol (e.g., “RELIANCE”)
SEG
string
default:"E"
required
Market segment (E for Equity)
INST
string
default:"EQUITY"
required
Instrument type
SEC_ID
string
required
Security ID (Sid) from master_isin_map.json
EXPCODE
number
default:"0"
required
Expiry code (0 for equity stocks)
INTERVAL
string
default:"D"
required
Candle interval:
  • D - Daily candles (most common)
  • W - Weekly candles
  • M - Monthly candles
  • 1, 5, 15, 30, 60 - Intraday minutes
START
number
required
Start timestamp (Unix epoch). Script uses 215634600 (Oct 31, 1976) to fetch maximum available history.
END
number
required
End timestamp (Unix epoch). Typically current timestamp.

Live Snapshot Request

{
  "data": {
    "sort": "Volume",
    "sorder": "desc",
    "count": 5000,
    "fields": ["Sym", "Open", "High", "Low", "Ltp", "Volume"],
    "params": [{"field": "Exch", "op": "", "val": "NSE"}]
  }
}

Output Files

ohlcv_data/{SYMBOL}.csv
CSV
Per-stock OHLCV data in CSV format:
Date,Open,High,Low,Close,Volume
2024-01-01,2450.00,2480.50,2445.00,2475.30,5234567
2024-01-02,2476.00,2490.00,2470.00,2485.60,4123456
  • Date: YYYY-MM-DD format
  • Open: Opening price
  • High: Intraday high
  • Low: Intraday low
  • Close: Closing price (or LTP for today)
  • Volume: Trading volume
Files are deduplicated by date and sorted chronologically.

Function Signatures

Main Fetching Function

def fetch_single_stock(sym, details, live_snapshot=None):
    """
    Fetches OHLCV data for a single stock with incremental update logic.
    
    Args:
        sym (str): Stock symbol
        details (dict): Stock metadata with Sid, Exch, Seg, Inst
        live_snapshot (dict, optional): Today's live OHLCV data
        
    Returns:
        str: Status - "success", "uptodate", or "error"
        
    Process:
        1. Check if CSV exists and get last date
        2. Determine start date (2 years ago or day after last date)
        3. Fetch missing history in 180-day chunks
        4. Append today's live snapshot if available
        5. Merge with existing data and deduplicate by date
        6. Save to CSV
    """

Live Snapshot Function

def get_live_snapshots():
    """
    Fetches today's live OHLCV snapshot for all stocks.
    
    Returns:
        dict: {Symbol: {Open, High, Low, Ltp, Volume}}
        
    Used to fill today's data gap since historical API lags by 1 day.
    """

Dependencies

Python Packages
list
  • requests - HTTP client
  • json - JSON processing
  • os - File operations
  • time - Timestamp calculations
  • csv - CSV file reading/writing
  • datetime - Date parsing and formatting
  • concurrent.futures.ThreadPoolExecutor - Parallel execution
Local Modules
list
  • pipeline_utils.BASE_DIR - Base directory path
  • pipeline_utils.get_headers() - API headers with Origin header
Input Files
list
  • dhan_data_response.json - Raw market data with Sid field

Configuration

CHUNK_DAYS
number
default:"180"
Number of days per API request chunk. Prevents timeout on large date ranges.
MAX_THREADS
number
default:"15"
Number of concurrent download threads

Code Example

import json
import requests
import os
import time
import csv
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from pipeline_utils import BASE_DIR, get_headers

INPUT_FILE = os.path.join(BASE_DIR, "dhan_data_response.json")
OUTPUT_DIR = os.path.join(BASE_DIR, "ohlcv_data")
CHUNK_DAYS = 180
MAX_THREADS = 15
TICK_API_URL = "https://openweb-ticks.dhan.co/getDataH"

def get_live_snapshots():
    """Fetch today's live OHLCV for all stocks."""
    print("Fetching live snapshots...")
    payload = {
        "data": {
            "sort": "Volume", "sorder": "desc", "count": 5000,
            "fields": ["Sym", "Open", "High", "Low", "Ltp", "Volume"],
            "params": [{"field": "Exch", "op": "", "val": "NSE"}]
        }
    }
    try:
        response = requests.post(
            "https://ow-scanx-analytics.dhan.co/customscan/fetchdt",
            json=payload,
            headers=get_headers(include_origin=True),
            timeout=15
        )
        if response.status_code == 200:
            return {i['Sym']: i for i in response.json().get('data', [])}
    except:
        pass
    return {}

def fetch_history_chunk(payload):
    """Fetch a single chunk of historical OHLCV."""
    try:
        response = requests.post(TICK_API_URL, json=payload, headers=get_headers(include_origin=True), timeout=15)
        if response.status_code == 200:
            data = response.json().get("data", {})
            times = data.get("Time", [])
            if times:
                o, h, l, c, v = data.get("o", []), data.get("h", []), data.get("l", []), data.get("c", []), data.get("v", [])
                rows = []
                for i in range(len(times)):
                    t = times[i]
                    dt_str = t if isinstance(t, str) else datetime.fromtimestamp(t).strftime("%Y-%m-%d")
                    rows.append({'Date': dt_str, 'Open': o[i], 'High': h[i], 'Low': l[i], 'Close': c[i], 'Volume': v[i]})
                return rows
    except:
        pass
    return []

def fetch_single_stock(sym, details, live_snapshot=None):
    output_path = os.path.join(OUTPUT_DIR, f"{sym}.csv")
    today_str = datetime.now().strftime("%Y-%m-%d")
    
    # Default: 2 years of history
    global_start_ts = int(time.time()) - (2 * 365 * 86400)
    target_start = global_start_ts
    
    existing_rows = []
    if os.path.exists(output_path):
        try:
            with open(output_path, "r") as f:
                existing_rows = list(csv.DictReader(f))
                if existing_rows:
                    last_date = existing_rows[-1]["Date"]
                    last_dt = datetime.strptime(last_date, "%Y-%m-%d")
                    target_start = int(last_dt.timestamp()) + 86400
        except:
            pass

    # Fetch missing history in chunks
    new_rows = []
    current_end = int(time.time())
    
    if target_start < current_end - 86400:
        chunk_ptr = current_end
        while chunk_ptr > target_start:
            c_start = max(target_start, chunk_ptr - (CHUNK_DAYS * 86400))
            payload = {
                "EXCH": details["Exch"], "SYM": sym, "SEG": details["Seg"],
                "INST": details["Inst"], "SEC_ID": details["Sid"],
                "EXPCODE": 0, "INTERVAL": "D", "START": int(c_start), "END": int(chunk_ptr)
            }
            chunk_rows = fetch_history_chunk(payload)
            if chunk_rows:
                new_rows.extend(chunk_rows)
            
            chunk_ptr = c_start - 86400

    # Add today's live data
    if live_snapshot:
        s = live_snapshot
        today_row = {
            'Date': today_str,
            'Open': s.get('Open', 0),
            'High': s.get('High', 0),
            'Low': s.get('Low', 0),
            'Close': s.get('Ltp', 0),
            'Volume': s.get('Volume', 0)
        }
        new_rows.append(today_row)

    if not new_rows:
        return "uptodate"

    # Merge and deduplicate
    merged = {r['Date']: r for r in existing_rows + new_rows}
    final_rows = sorted(merged.values(), key=lambda x: x['Date'])

    # Save to CSV
    with open(output_path, "w", newline='') as f:
        writer = csv.DictWriter(f, fieldnames=['Date', 'Open', 'High', 'Low', 'Close', 'Volume'])
        writer.writeheader()
        writer.writerows(final_rows)
    
    return "success"

def main():
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)

    with open(INPUT_FILE, "r") as f:
        dhan_data = json.load(f)

    stocks = {
        item["Sym"]: {"Sid": item["Sid"], "Exch": item.get("Exch", "NSE"), "Inst": "EQUITY", "Seg": "E"}
        for item in dhan_data if item.get("Sym") and item.get("Sid")
    }

    live_snapshots = get_live_snapshots()

    print(f"Syncing OHLCV for {len(stocks)} stocks...")
    
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        futures = {executor.submit(fetch_single_stock, s, stocks[s], live_snapshots.get(s)): s for s in stocks}
        for future in as_completed(futures):
            result = future.result()
            # Handle result

Usage

python3 fetch_all_ohlcv.py

Performance

  • Execution Time:
    • First run (full 2-year history): ~25-30 minutes for 2,775 stocks
    • Incremental updates: ~2-5 minutes (only fetches missing days)
  • API Calls: Variable (depends on missing date ranges)
  • Output: 2,775 CSV files in ohlcv_data/ directory
  • Concurrency: 15 parallel threads
  • Chunk Size: 180-day chunks to prevent timeouts

Incremental Update Logic

  1. Check Existing File: If {SYMBOL}.csv exists, read last date
  2. Calculate Gap: Determine missing dates from last date to today
  3. Chunk Fetching: Download missing data in 180-day chunks
  4. Merge: Combine existing + new data, deduplicate by date
  5. Live Integration: Append today’s live snapshot
  6. Save: Overwrite CSV with complete dataset

Hybrid Live Integration

Historical API typically lags by 1 day. The script solves this by:
  1. Fetching live snapshots for all stocks at script start
  2. Appending today’s {Open, High, Low, Ltp, Volume} to each stock’s data
  3. Using Ltp (Last Traded Price) as today’s Close
  4. Deduplicating by date during merge (live data overwrites if date exists)

Notes

  • Requires Sid: Stocks without Sid are skipped
  • Smart Updates: Only fetches missing date ranges (not full history on every run)
  • Maximum History: Uses start timestamp 215634600 (1976) to force API to return all available data
  • Date Deduplication: Prevents duplicate dates when merging existing + new data
  • CSV Format: Standard CSV with headers for easy import into analysis tools
  • Default Lookback: 2 years (500 trading days) for technical indicator calculations (200 MA, etc.)